【AWS Lambda】処理対象ファイルが大きい時に、メモリの大きな関数に代替処理をさせる際のTips
こんにちは、平野です。
毎日のログなど、定期的に出力されるファイルに処理をする場合、 基本的には同じくらいのサイズのファイルが対象になるかと思います。 これをLambdaで処理する場合、同じようなファイルサイズなので、 メモリサイズもそれに見合った量を設定することで、無駄な課金を防ぐことができます。
しかし何か特別なイベントがあり、通常よりも大きなサイズのファイルが出力されるような場合、 ギリギリに設定したメモリ量では処理が失敗してしまうことになります。
これを避けるために、今回はLambda内部で対象ファイルサイズを確認して、 一定以上だったら大きめのメモリを確保しておいたLambda関数で実行するような仕掛けを作ってみました。 特別難しいことはなく、そのままコーディングすれば良いのですが、 いくつか覚えておいた方が良さそうなことがあったのでブログにします。
想定しているケース
今回対処しようとしたケースでは、大きな容量のファイルが出力されることは滅多になく、 容量が大きな場合は通常の10倍程度の量まで想定されるようなケースでした。
もし容量の振れ幅が大きくない場合は、デフォルトのメモリ量を増やしてしまった方が、 結局管理面でアドバンテージがあるかもしれません。 また、メモリ量を増やすとそれに対応してCPU性能も上がります1ので、 一度メモリを上げてみて、処理時間が短縮されて料金的に相殺されるケースでないか確認してみることをオススメします。
アプリケーションとしては、pythonで動くコードをServerlessFrameworkを使ってデプロイしています。
同じ処理をするコードであれば、一つにまとめておきたかったので、
lambda_handler
自体は一つで、それをメモリサイズだけ異なる2パターンの関数としてデプロイするようにしました。
なお、この記事では解説は割愛させて頂きますが、 呼び出す側のLambda関数には、他のLambda関数や、SNSトピックへアクセスできる ロールが付与されていることを確認してください。
コード
Serverless.yml
Serverless.ymlの内容としては以下のようになります。
functions: SampleApp: handler: sample_app.lambda_handler memorySize: 256 timeout: 60 reservedConcurrency: 50 events: - (何かしらの呼び出しイベント) SampleApp_forLargeFile: handler: sample_app.lambda_handler memorySize: 1024 timeout: 60 reservedConcurrency: 50
ハンドラとなるpythonコードsample_app.py
は1つで、メモリサイズだけを変更して2つの関数を配置します。
sample_app.py
FILE_SIZE_THRESH = int(os.environ.get("FILE_SIZE_THRESH", "1000000")) def lambda_handler(event, context): if context.function_name.endswith("_forLargeFile"): print("メモリサイズ{}MBの関数で実行します".format(context.memory_limit_in_mb)) else: # ファイルサイズのチェック bucket = get_bucket(event) key = get_key(event) size = check_object_size(bucket, key) if size >= FILE_SIZE_THRESH: print("容量の大きなファイルを検知しました: {}Bytes".format(size)) import boto3 lambda_client = boto3.client("lambda") lambda_client.invoke( FunctionName=context.function_name + "_forLargeFile", InvocationType="Event", Payload=json.dumps(event)) return # 以降に本処理を書く def get_bucket(event): # 省略 def get_key(event): # 省略 def check_object_size(bucket, key): s3 = boto3.client('s3') return s3.head_object(Bucket=bucket, Key=key)['ContentLength']
lambda_handler
の直後で、関数名によって場合分けをしています。
この例では_forLargeFile
と言うsuffixがついていたら大きいファイル向けの関数である、
というコードになっています。
ファイルサイズをチェックして、閾値を超えていたら別のLambda関数を呼びます。
基本的にはlambda_client
を作成してinvoke
メソッドを呼ぶだけですが、
注意点としては、関数の呼び出しタイプとして
InvocationType="Event"
と指定することです。
Event
を指定すると非同期での呼び出しになりますが、
デフォルトでは同期呼び出しになってしまうため、呼び出し先の関数が終わるまで待機してしまいます。
同期呼び出しでは呼び出し元のLambda関数がずっと課金時間になってしまいますし、
呼び出し元の関数側でタイムアウトが発生したら、呼び出し先関数も一緒に終了してしまいます。
ということで、必ず呼び出しタイプはEvent
を指定しましょう。
また、eventの渡し方としては
Payload=json.dumps(event))
のようにします。
こうすれば、event
をそのまま渡すことができます。
VPCLambdaの場合
VPC内ではないLambda関数の場合は上記の方法で良いのですが、VPC内Lambda関数の場合には一工夫が必要です。
(NAT Gatewayの設置されていない)VPC内で実行されるLambda関数の場合、Lambda関数を呼び出すAPIを実行することができません。 呼び出す側、呼び出される側共に同じVPC内に存在していたとしても、 一度インターネットを経由しなければならないためです。
VPCからインターネットへ出るのであればNAT Gatewayが思い浮かびます2が、 今回はVPCエンドポイントが設定されたSNSが既にあったので、SNSを経由してLambda関数を呼び出す方法を取りました。 NAT Gatewayが既に設置されている場合はそれを利用した方が無駄なお金はかからずに済みます。
SNSのVPCエンドポイントを作成する方法は下記のブログを参照してみて下さい。
Serverless.yml
SNSを経由する場合のServerless.yml
の例としては以下のようになります。
events
に新規に作るsns
の名前を渡せば新しいリソースを作成してLambda関数との紐付けまでやってくれますので、
その結果作成されるARNを環境変数として定義しておいて(名前がわかれば予めARNまで決められる)、
Lambda関数からはそのARNにpublishを行うという流れになります。
provider: environment: VPC_LAMBDA_INVOKE_TOPIC_ARN: "arn:aws:sns:${self:provider.region}:#{AWS::AccountId}:vpc_lambda_invoke_topic_name" functions: SampleApp: handler: sample_app.lambda_handler memorySize: 256 timeout: 60 reservedConcurrency: 50 events: - (何かしらの呼び出しイベント) SampleApp_forLargeFile: handler: sample_app.lambda_handler memorySize: 1024 timeout: 60 reservedConcurrency: 50 events: - sns: vpc_lambda_invoke_topic_name
vpc_lambda_invoke_topic_name
がSNSトピック名なので、適宜変更してください。
sample_app.py
FILE_SIZE_THRESH = int(os.environ.get("FILE_SIZE_THRESH", "1000000")) def lambda_handler(event, context): if context.function_name.endswith("_forLargeFile"): event = json.loads(event['Records'][0]['Sns']['Message']) print("メモリサイズ{}MBの関数で実行します".format(context.memory_limit_in_mb)) else: bucket = get_bucket(event) key = get_key(event) size = check_object_size(bucket, key) if size >= FILE_SIZE_THRESH: print("容量の大きなファイルを検知しました: {}Bytes".format(size)) import boto3 sns_client = boto3.client("sns") sns_client.publish( TopicArn=os.environ.get("VPC_LAMBDA_INVOKE_TOPIC_ARN"), Subject="Subject", Message=json.dumps(event)) return
lambda_client.invoke
から、sns_client.publish
に変更し、
環境変数のトピックARNを指定しています。
Subject
はなんでも良い(カラ文字列はダメ)ので、適当な文字列を入れます。
そして、Message
に元のevent
を渡します。
呼び出される側としては、
event = json.loads(event['Records'][0]['Sns']['Message'])
のように、SNSで送られてくる形式から、オリジナルのevent
の情報を取り出す必要があります。
SQSの場合はタイムアウト時間に注意
この仕組みをSQSから発火するLambda関数に使用する場合は、 呼び出される側のタイムアウト時間に気をつける必要があります。
SQSの可視性タイムアウト時間は、それと紐づいたLambda関数のタイムアウト時間よりも長く設定する必要があります。 マネジメントコンソールから設定する際には、この関係性がチェックされます。 この条件を満たさない場合、処理が走っている間に別のLambda関数が処理を開始してしまう可能性があるからです。
しかし、呼び出される側のLambda関数はSQSと直接紐づいている訳ではないため、このチェックが働きません。 なので、呼び出される側のLambdaのタイムアウト時間も、SQSの可視性タイムアウトをオーバーしないように気をつけてください。 もしも容量の大きなファイルには処理時間がかかるというような場合には、 DynamoDBを使って処理中のファイルを記録しておくなどの処置が必要になるかと思います。
そのほかのやり方
ブログを公開したところ、社内で、他の方法も教えてもらったので簡単に紹介します
- データ起因でLambda関数がエラーになるなら、StepFunctionsの利用も検討する
- 対象ファイル容量のチェックはVPC外で行い、SNS経由でVPC内Lambdaを起動する
- これならNAT GatewayもVPCエンドポイントも不要
まとめ
メモリ設定量の異なるだけのLambda関数を用意して、 処理対象のファイルのサイズによって大きなメモリの関数に処理を委譲する時のポイントをいくつか紹介しました。
なお、このような仕組みを作る場合には、
Lambda関数起動の無限ループに陥らないか、よく確認するようにしてください!!
自分とほぼ同じ関数を呼ぶため、少しの間違いでも、自分自身を呼ぶ無限ループに入ってしまう可能性があります。
(私は、もちろん、やりましたとも!)
そうするとLambda関数が実行されまくるので、お金もかかりますし、
そのせいでLambda関数の同時起動数制限に引っかかって他の関数に影響が出てしまったりするかと思います。
(reservedConcurrency
を設定しているのはそのためです)
おおよそ一定のサイズのファイルを処理するけど、時々大きいのが来る、というケースがどれほど汎用性があるのかはわかりませんが、 単純にメモリ設定を大きくしてしまうとお金がもったいないですので、こんな方法もあるよ、という話でした。
誰かの参考になれば幸いです。
- https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/resource-model.html ↩
- 社内でツッコミが入った後にこう書いています。私自身はまだ「○○と言ったら××」というストックが少ないです。 ↩